package lyw.app.dwd.db;

import lyw.app.BaseSqlApp;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Author: lyw
 * @Date: 2022/8/31 9:34
 * @Describe:
 */
public class DwdUserInfo extends BaseSqlApp {
    public static void main(String[] args) {
        new DwdUserInfo().init(
                3001,
                2,
                "DwdUserInfo"
        );
    }

    @Override
    protected void handle(StreamExecutionEnvironment env,
                          StreamTableEnvironment tEnv) {
        // 1. 读取ods_db数据
        readOdsDb(tEnv, "DwdUserInfo");

        // 2. 过滤user_info数据
        Table user_ifo = tEnv.sqlQuery("select " +
                " data['id'] id, " +       //
                " data['create_time'] create_time, " +      //
                " ts, " +
                " pt " +
                " from ods_db " +
                " where `database`='edu' " +
                " and `table`='user_info' " +
                " and (" +
                " `type`='insert' " +
                "  or `type`='update' " +
                ")");
        tEnv.createTemporaryView("user_info", user_ifo);


    }
}
